Skip to content

Fixes #27150: Bulk-fetch TaskInstances per DAG to eliminate N+1 in yield_pipeline_status#27152

Merged
pmbrull merged 9 commits intoopen-metadata:mainfrom
RajdeepKushwaha5:fix/airflow-n1-task-instances-bulk-query
Apr 20, 2026
Merged

Fixes #27150: Bulk-fetch TaskInstances per DAG to eliminate N+1 in yield_pipeline_status#27152
pmbrull merged 9 commits intoopen-metadata:mainfrom
RajdeepKushwaha5:fix/airflow-n1-task-instances-bulk-query

Conversation

@RajdeepKushwaha5
Copy link
Copy Markdown
Contributor

Describe your changes:

Fixes #27150

AirflowSource.yield_pipeline_status was calling get_task_instances once per DagRun inside a loop — one separate SELECT ... FROM task_instance WHERE dag_id = ? AND run_id = ? per run. With the default numberOfStatus=10 and N DAGs this produces N × 10 extra DB round-trips on every ingestion run.

Root cause: get_task_instances accepted a single run_id string, so the only way to retrieve task instances for multiple runs was a loop.

Fix:

Before After
get_task_instances signature run_id: strList[OMTaskInstance] run_ids: List[str]Dict[str, List[OMTaskInstance]]
DB filter TaskInstance.run_id == run_id TaskInstance.run_id.in_(run_ids)
Grouping N separate queries, results consumed immediately Single query, results grouped into defaultdict(list) by run_id
yield_pipeline_status Calls get_task_instances inside for dag_run in dag_run_list Collects all run_ids → calls get_task_instances once → looks up with tasks_by_run_id.get(run_id, [])

Task-instance query count per DAG drops from numberOfStatus to 1.

Error handling preserved: result (defaultdict(list)) is initialised before the try block, so on DB exception the method returns an empty dict (same safe fallback behaviour as before — all runs get empty task lists).

How I tested:

  • Syntax verified with ast.parse
  • Manually traced the lookup path: run_ids is derived from the same dag_run_list used in the iteration, so no run_id can be present in the loop but absent from the bulk query
  • Empty-run_ids guard (if run_ids else {}) avoids issuing IN () which is illegal in some SQL dialects

Type of change:

  • Bug fix

Checklist:

  • I have read the CONTRIBUTING document.
  • My PR title is Fixes #NNNNN: Bulk-fetch TaskInstances per DAG to eliminate N+1 in yield_pipeline_status
  • I have commented on my code, particularly in hard-to-understand areas.
  • For JSON Schema changes: I updated the migration scripts or explained why it is not needed.
  • I have added a test that covers the exact scenario we are fixing.

@RajdeepKushwaha5 RajdeepKushwaha5 requested a review from a team as a code owner April 8, 2026 04:19
Copilot AI review requested due to automatic review settings April 8, 2026 04:19
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 8, 2026

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

Comment thread ingestion/tests/unit/topology/pipeline/test_airflow.py Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes the Airflow DB-backed ingestion path by eliminating an N+1 query pattern when collecting task instances for recent DAG runs, switching from per-DagRun task-instance queries to a single bulk query per DAG.

Changes:

  • Updated AirflowSource.get_task_instances to accept multiple run_ids and return task instances grouped by run_id.
  • Refactored yield_pipeline_status to bulk-fetch task instances once per DAG and reuse grouped results per DagRun.
  • Added a unit test intended to validate the bulk query + grouping behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py Implements the bulk run_id IN (...) query and per-run grouping; updates yield_pipeline_status to use the grouped results.
ingestion/tests/unit/topology/pipeline/test_airflow.py Adds a unit test for the new bulk task-instance retrieval behavior.

Comment thread ingestion/tests/unit/topology/pipeline/test_airflow.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py Outdated
@RajdeepKushwaha5 RajdeepKushwaha5 force-pushed the fix/airflow-n1-task-instances-bulk-query branch from 011b336 to e1025f2 Compare April 8, 2026 04:26
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 8, 2026

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

Comment thread ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py Outdated
Copilot AI review requested due to automatic review settings April 8, 2026 04:30
@RajdeepKushwaha5 RajdeepKushwaha5 force-pushed the fix/airflow-n1-task-instances-bulk-query branch from e1025f2 to 1023bb6 Compare April 8, 2026 04:30
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 8, 2026

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.

@harshach harshach added the safe to test Add this label to run secure Github workflows on PRs label Apr 8, 2026
@harshach
Copy link
Copy Markdown
Collaborator

harshach commented Apr 8, 2026

@RajdeepKushwaha5 did you run this against an airflow instance and made sure that all dags are listed properly without any regression?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 9, 2026

🟡 Playwright Results — all passed (29 flaky)

✅ 3658 passed · ❌ 0 failed · 🟡 29 flaky · ⏭️ 89 skipped

Shard Passed Failed Flaky Skipped
🟡 Shard 1 474 0 7 4
🟡 Shard 2 648 0 5 7
🟡 Shard 3 653 0 6 1
🟡 Shard 4 632 0 2 27
🟡 Shard 5 609 0 2 42
🟡 Shard 6 642 0 7 8
🟡 29 flaky test(s) (passed on retry)
  • Features/CustomizeDetailPage.spec.ts › Database Schema - customization should work (shard 1, 1 retry)
  • Features/NavigationBlocker.spec.ts › should navigate to new page when "Leave" is clicked (shard 1, 1 retry)
  • Flow/Metric.spec.ts › Verify Related Metrics Update (shard 1, 1 retry)
  • Flow/Tour.spec.ts › Tour should work from help section (shard 1, 1 retry)
  • Flow/Tour.spec.ts › Tour should work from URL directly (shard 1, 1 retry)
  • Pages/AuditLogs.spec.ts › should apply both User and EntityType filters simultaneously (shard 1, 1 retry)
  • Pages/UserCreationWithPersona.spec.ts › Create user with persona and verify on profile (shard 1, 1 retry)
  • Features/BulkEditEntity.spec.ts › Glossary (shard 2, 1 retry)
  • Features/DataQuality/TestCaseImportExportE2eFlow.spec.ts › Admin: Complete export-import-validate flow (shard 2, 1 retry)
  • Features/DataQuality/TestCaseImportExportE2eFlow.spec.ts › EditAll User: Complete export-import-validate flow (shard 2, 1 retry)
  • Features/DataQuality/TestCaseResultPermissions.spec.ts › User with TEST_CASE.EDIT_ALL can see edit action on test case (shard 2, 1 retry)
  • Features/DataQuality/TestCaseResultPermissions.spec.ts › User with only VIEW cannot PATCH results (shard 2, 1 retry)
  • Features/LandingPageWidgets/DomainDataProductsWidgets.spec.ts › Data Product asset count should update when assets are added (shard 3, 1 retry)
  • Features/RestoreEntityInheritedFields.spec.ts › Validate restore with Inherited domain and data products assigned (shard 3, 2 retries)
  • Features/RestoreEntityInheritedFields.spec.ts › Validate restore with Inherited domain and data products assigned (shard 3, 1 retry)
  • Features/RestoreEntityInheritedFields.spec.ts › Validate restore with Inherited domain and data products assigned (shard 3, 1 retry)
  • Features/RTL.spec.ts › Verify Following widget functionality (shard 3, 1 retry)
  • Flow/ServiceForm.spec.ts › Verify form selects are working properly (shard 3, 1 retry)
  • Pages/Customproperties-part2.spec.ts › entityReferenceList shows item count, scrollable list, no expand toggle (shard 4, 1 retry)
  • Pages/Entity.spec.ts › Inactive Announcement create & delete (shard 4, 1 retry)
  • Pages/EntityDataConsumer.spec.ts › Tag Add, Update and Remove (shard 5, 1 retry)
  • Pages/EntityDataSteward.spec.ts › Tier Add, Update and Remove (shard 5, 1 retry)
  • Pages/Lineage/DataAssetLineage.spec.ts › verify create lineage for entity - Search Index (shard 6, 1 retry)
  • Pages/Lineage/LineageFilters.spec.ts › Verify lineage schema filter selection (shard 6, 1 retry)
  • Pages/Lineage/LineageRightPanel.spec.ts › Verify custom properties tab IS visible for supported type: searchIndex (shard 6, 1 retry)
  • Pages/UserDetails.spec.ts › Subdomain is visible when expanding parent domain in tree (shard 6, 1 retry)
  • Pages/Users.spec.ts › Permissions for table details page for Data Consumer (shard 6, 1 retry)
  • Pages/Users.spec.ts › Check permissions for Data Steward (shard 6, 1 retry)
  • VersionPages/EntityVersionPages.spec.ts › Directory (shard 6, 1 retry)

📦 Download artifacts

How to debug locally
# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip    # view trace

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

Comment on lines +339 to +346
result: Dict[str, List[OMTaskInstance]] = defaultdict(list)

# Short-circuit: avoid building and executing a query with an empty
# IN(...) list - unnecessary DB round-trip and rejected by some SQL
# dialects. Caller (yield_pipeline_status) already guards this, but
# defend at the boundary as well.
if not run_ids:
return result
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_task_instances is annotated to return Dict[str, List[OMTaskInstance]], but it actually returns a defaultdict(list). Returning a defaultdict can introduce subtle side effects for callers (e.g., result[missing_key] will create keys instead of raising), and it’s inconsistent with the declared return type. Consider using DefaultDict[...] internally and converting to a plain dict on return (or change the return annotation if you intend to expose defaultdict).

Copilot uses AI. Check for mistakes.
@RajdeepKushwaha5
Copy link
Copy Markdown
Contributor Author

Hi @IceS2
The only failing check is Postgresql PR Playwright E2E Tests / playwright-ci-postgresql (3, 6) — the other 5 Playwright shards all passed , and so did SonarCloud (Quality Gate passed, 100% coverage on new code, 0 new issues), Java/Python checkstyle, and SonarCloud code scanning (no new alerts in changed code).

The shard-3 failure looks like Playwright flakiness unrelated to this PR's changes.
Could you re-trigger that shard when you have a moment?

@IceS2
Copy link
Copy Markdown
Contributor

IceS2 commented Apr 17, 2026

Hi @IceS2 The only failing check is Postgresql PR Playwright E2E Tests / playwright-ci-postgresql (3, 6) — the other 5 Playwright shards all passed , and so did SonarCloud (Quality Gate passed, 100% coverage on new code, 0 new issues), Java/Python checkstyle, and SonarCloud code scanning (no new alerts in changed code).

The shard-3 failure looks like Playwright flakiness unrelated to this PR's changes. Could you re-trigger that shard when you have a moment?

I had already done that, let's see if it passes. Will check if it is a know issue otherwise

@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented Apr 19, 2026

Code Review ✅ Approved 2 resolved / 2 findings

Bulk-fetch implementation optimizes TaskInstance retrieval by querying per DAG to eliminate N+1 performance overhead. Test suite issues regarding read-only property assignment and incorrect mock return types have been addressed.

✅ 2 resolved
Bug: Test assigns to read-only property session, will raise AttributeError

📄 ingestion/tests/unit/topology/pipeline/test_airflow.py:1047-1054 📄 ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py:223-231
session is a read-only @property (no setter) on AirflowSource (metadata.py:223-231). The test at line 1048 does self.airflow.session = mock_session, which will raise AttributeError: can't set attribute at runtime, meaning this new test will always fail.

The underlying _session private attribute is what should be patched instead.

Bug: Existing tests mock get_task_instances with wrong return type

📄 ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py:382-390 📄 ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py:396
The signature of get_task_instances changed from returning List[OMTaskInstance] to Dict[str, List[OMTaskInstance]]. However, two tests in tests/unit/airflow/test_airflow_metadata.py (not part of this diff) still mock it as MagicMock(return_value=[]). Since yield_pipeline_status now calls tasks_by_run_id.get(dag_run.run_id, []), calling .get() on a list will raise AttributeError. These tests will break.

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

@sonarqubecloud
Copy link
Copy Markdown

@RajdeepKushwaha5
Copy link
Copy Markdown
Contributor Author

ping @IceS2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Airflow ingestion fires N extra DB queries (one per DagRun) to fetch TaskInstances inside yield_pipeline_status

5 participants